package com.study.flow;

import com.study.util.Utils;

import java.util.Random;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * @author tc
 * @date 2019/1/1
 */
public class FlowDemo {

    public static void main(String[] args) {

        try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {

            publisher.subscribe(new StringSubscribe("A"));
            publisher.subscribe(new StringSubscribe("B"));
            publisher.subscribe(new StringSubscribe("C"));

            publisher.submit("Hello World");

            Thread.currentThread().join(2 * 10000L);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static class StringSubscribe implements Flow.Subscriber<String> {

        private static final Random random = new Random();

        private Flow.Subscription subscription;

        private final String name;

        private StringSubscribe(String name) {
            this.name = name;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            Utils.println("订阅者 [" + name + "] 开始订阅");
            // 向服务器反响请求
            subscription.request(1);
            this.subscription = subscription;
        }

        @Override
        public void onNext(String item) {
            Utils.println("订阅者 [" + name + "] 接收数据：" + item);
            if (random.nextBoolean()){
                subscription.cancel();
            } else {
                throw new RuntimeException();
            }
        }

        @Override
        public void onError(Throwable throwable) {
            Utils.println("订阅者 [" + name + "] 订阅异常: " + throwable.getMessage());
        }

        @Override
        public void onComplete() {
            Utils.println("订阅者 [" + name + "] 完成订阅");
        }

    }

}
